In [1]:
[reduce(+, 1:8) sum(1:8)]
Out[1]:
In [2]:
[reduce(*, 1:8) prod(1:8)]
Out[2]:
In [3]:
boring(a,b)=a
@show reduce(boring, 1:8)
boring2(a,b)=b
@show reduce(boring2, 1:8)
Out[3]:
You can also use reduce to compute Fibonacci numbers using their recurrences.
$$\begin{pmatrix} f_2 \\ f_1 \end{pmatrix} = \begin{pmatrix} f_1 + f_0 \\ f_1 \end{pmatrix} = \begin{pmatrix} 1 & 1 \\ 1 & 0 \end{pmatrix} \begin{pmatrix} f_1 \\ f_0 \end{pmatrix} $$$$\begin{pmatrix} f_{n+1} \\ f_n \end{pmatrix} = \dots = \begin{pmatrix} 1 & 1 \\ 1 & 0 \end{pmatrix}^n \begin{pmatrix} f_1 \\ f_0 \end{pmatrix} $$From this, you can show that
$$\begin{pmatrix} 1 & 1 \\ 1 & 0 \end{pmatrix}^n = \begin{pmatrix} f_{n+1} & f_n \\ f_n & f_{n-1} \end{pmatrix} $$
In [4]:
fib(j)=reduce(*, [[1 1; 1 0] for i=1:j])
map(fib, [4, 7])
Out[4]:
You can solve recurrences of any complexity using reduce
. For example, reduce
can compute a Hadamard matrix from its definition in terms of its submatrices:
and so on.
In [5]:
Hadamard(n)=reduce(kron, [[1 1; 1 -1] for i=1:n])
Hadamard(3)
Out[5]:
In [6]:
ans*ans' #This is a legitimate Hadamard matrix
Out[6]:
In [7]:
#Reduction of matrix multiplications
M=[randn(2,2) for i=1:4]
printnice(x)=println(round(x,3))
printnice(M[4]*M[3]*M[2]*M[1])
printnice(reduce((A,B)->B*A, M)) #backward multiply
printnice(reduce(*, M)) #forward multiply
In the following example we apply reduce()
to function composition:
In [8]:
h=reduce((f,g)->(x->f(g(x))), [sin cos tan])
[h(pi) sin(cos(tan(pi)))]
Out[8]:
Having discussed reduce
, we are now ready for the idea behind prefix sum.
Suppose you wanted to compute the partial sums of a vector, i.e. given
y[1:n]
, we want to overwrite the vector y
with the vector of partial sums
new_y[1] = y[1]
new_y[2] = y[1] + y[2]
new_y[3] = y[1] + y[2] + y[3]
...
At first blush, it seems impossible to parallelize this, since
new_y[1] = y[1]
new_y[2] = new_y[1] + y[2]
new_y[3] = new_y[2] + y[3]
...
which is an intrinsically serial process.
In [9]:
function prefix_serial!(y,+)
@inbounds for i=2:length(y)
y[i]=y[i-1]+y[i]
end
y
end
Out[9]:
However, it turns out that because addition (+
) is associative, we can regroup the order of how these sums are carried out. (This of course extends to other associative operations such as multiplication.) Another ordering of 8 associative operations is provided by prefix8!
:
In [10]:
function prefix8!(y, *)
length(y)==8 || error("length 8 only")
for i in [2,4,6,8]; y[i]=y[i-1]*y[i]; end
for i in [ 4, 8]; y[i]=y[i-2]*y[i]; end
for i in [ 8]; y[i]=y[i-4]*y[i]; end
for i in [ 6 ]; y[i]=y[i-2]*y[i]; end
for i in [ 3,5,7 ]; y[i]=y[i-1]*y[i]; end
y
end
function prefix!(y,.*)
l=length(y)
k=int(ceil(log2(l)))
@inbounds for j=1:k, i=2^j:2^j:min(l, 2^k) #"reduce"
y[i]=y[i-2^(j-1)].*y[i]
end
@inbounds for j=(k-1):-1:1, i=3*2^(j-1):2^j:min(l, 2^k) #"broadcast"
y[i]=y[i-2^(j-1)].*y[i]
end
y
end
Out[10]:
prefix!
rearranges the operations to form two spanning trees:
In [11]:
using Compose
function gate(proc_from, proc_to, depth, num_procs;
radius_in=0.1/num_procs, radius_gate=0.25/num_procs)
const k::Int=ceil(log2(num_procs)) #Height of tree
in1 = (proc_from-0.5)/num_procs, depth/(2k) #Coordinates of input gate 1
in2 = (proc_to-0.5)/num_procs, depth/(2k) #Coordinates of input gate 2
op = (proc_to-0.5)/num_procs, (depth+0.5)/(2k) #Coordinates of gate operation
compose(canvas(),
compose(canvas(), circle(in1..., radius_in), circle(in2..., radius_in),
circle(op..., radius_gate), fill("white")),
compose(canvas(), lines(in1, op), lines(in2, op), linewidth(0.3mm)))
end
function drawcircuit(num_procs::Int)
circuit=Compose.CanvasTree[]
const k::Int=ceil(log2(num_procs)) #Height of tree
for j=1:k, i=2^j:2^j:min(num_procs, 2^k) #Draw "reduce" tree
push!(circuit, gate(i-2^(j-1), i, j, num_procs))
end
for j=(k-1):-1:1, i=3*2^(j-1):2^j:min(num_procs, 2^k) #Draw "broadcast" tree
push!(circuit, gate(i-2^(j-1), i, 2k-j, num_procs))
end
push!(circuit, compose(canvas(), #Draw vertical lines for each processor
[lines(((i-0.5)/num_procs,0),((i-0.5)/num_procs,1)) for i=1:num_procs]...,
linewidth(0.1mm), stroke("grey")))
compose(canvas(), circuit...)
end
@time drawcircuit(8)
Out[11]:
Now let's run prefix
in parallel on every core on the CPU. Use addprocs
to attach additional worker processes.
In [12]:
@show Sys.CPU_CORES
@show nprocs() #Current number of processes
@show procs() #List of processes by index
#Add a specified number of processors
num_procs = 8#Sys.CPU_CORES
addprocs(max(0, num_procs-nprocs()))
@show nprocs() #Current number of processes
@show procs() #List of processes by index
import Base: fetch, length
fetch(t::Vector) = map(fetch, t) #Vectorize fetch
#Define elementary operations on remote data
length(r1::RemoteRef)=length(fetch(r1))
+(r1::RemoteRef,r2::RemoteRef)=@spawnat r2.where fetch(r1)+fetch(r2)
*(r1::RemoteRef,r2::RemoteRef)=@spawnat r2.where fetch(r1)*fetch(r2)
Out[12]:
In [13]:
#This line of code corresponds to one element of the diagram
# +(r1::RemoteRef,r2::RemoteRef)=@spawnat r2.where fetch(r1)+fetch(r2)
drawcircuit(2)
Out[13]:
For 8 processes, the serial version requires 7 operations. The parallel version uses 11 operations, but they are grouped into 5 simultaneous chunks of operations. Hence we expect that the parallel version runs in 5/7 the time needed for the naïve serial code.
In [15]:
#Before timing, force Julia to JIT compile the functions
for f in (prefix_serial!, prefix!)
f([randn(3,3) for i=1:2], *)
end
n=2048
@time r=RemoteRef[@spawnat p randn(n,n) for p in procs()] #Create RemoteRefs
s=fetch(r) #These are in-memory matrices
t=copy(r) #These are RemoteRefs
tic(); prefix_serial!(s, *); t_ser = toc()
tic(); @sync prefix!(t, *); t_par = toc() #Caution: race condition bug #4330
@printf("Serial: %.3f sec Parallel: %.3f sec speedup: %.3fx (theory=1.4x)", t_ser, t_par, t_ser/t_par)
The exact same serial code now runs in parallel thanks to multiple dispatch!
In [18]:
#Which method is used with these arguments?
@show typeof(s[1])
which(*, s[1:2]...)
@show typeof(t[1])
which(*, t[1:2]...)
In [ ]: